/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.kudu;



import com.google.auto.value.AutoValue;

import com.bff.gaia.unified.sdk.Pipeline;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.CannotProvideCoderException;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.io.BoundedSource;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Splitter;

import org.apache.kudu.Common;

import org.apache.kudu.client.KuduException;

import org.apache.kudu.client.KuduPredicate;

import org.apache.kudu.client.Operation;

import org.apache.kudu.client.RowResult;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.util.Collections;

import java.util.List;

import java.util.stream.Collectors;

import java.util.stream.Stream;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/**

 * A bounded source and sink for Kudu.

 *

 * <p>For more information, see the online documentation at <a

 * href="https://kudu.apache.org/">Kudu</a>.

 *

 * <h3>Reading from Kudu</h3>

 *

 * <p>{@code KuduIO} provides a source to read and returns a bounded collection of entities as

 * {@code PCollection&lt;T&gt;}. An entity is built by parsing a Kudu {@link RowResult} using the

 * provided {@link SerializableFunction}.

 *

 * <p>The following example illustrates various options for configuring the IO:

 *

 * <pre>{@code

 * pipeline.apply(

 *     KuduIO.<String>read()

 *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")

 *         .withTable("table")

 *         .withParseFn(

 *             (SerializableFunction<RowResult, String>) input -> input.getString(COL_NAME))

 *         .withCoder(StringUtf8Coder.of()));

 *     // above options illustrate a typical minimum set, returns PCollection<String>

 * }</pre>

 *

 * <p>{@code withCoder(...)} may be omitted if it can be inferred from the @{CoderRegistry}.

 * However, when using a Lambda Expression or an anonymous inner class to define the function, type

 * erasure will prohibit this. In such cases you are required to explicitly set the coder as in the

 * above example.

 *

 * <p>Optionally, you can provide {@code withPredicates(...)} to apply a query to filter rows from

 * the kudu table.

 *

 * <p>Optionally, you can provide {@code withProjectedColumns(...)} to limit the columns returned

 * from the Kudu scan to improve performance. The columns required in the {@code ParseFn} must be

 * declared in the projected columns.

 *

 * <p>Optionally, you can provide {@code withBatchSize(...)} to set the number of bytes returned

 * from the Kudu scanner in each batch.

 *

 * <p>Optionally, you can provide {@code withFaultTolerent(...)} to enforce the read scan to resume

 * a scan on another tablet server if the current server fails.

 *

 * <h3>Writing to Kudu</h3>

 *

 * <p>The Kudu sink executes a set of operations on a single table. It takes as input a {@link

 * PCollection PCollection} and a {@link FormatFunction} which is responsible for converting the

 * input into an idempotent transformation on a row.

 *

 * <p>To configure a Kudu sink, you must supply the Kudu master addresses, the table name and a

 * {@link FormatFunction} to convert the input records, for example:

 *

 * <pre>{@code

 * PCollection<MyType> data = ...;

 * FormatFunction<MyType> fn = ...;

 *

 * data.apply("write",

 *     KuduIO.write()

 *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")

 *         .withTable("table")

 *         .withFormatFn(fn));

 * }</pre>

 *

 * <h3>Experimental</h3>

 *

 * {@code KuduIO} does not support authentication in this release.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class KuduIO {

  private static final Logger LOG = LoggerFactory.getLogger(KuduIO.class);



  private KuduIO() {}



  public static <T> Read<T> read() {

    return new AutoValue_KuduIO_Read.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();

  }



  public static <T> Write<T> write() {

    return new AutoValue_KuduIO_Write.Builder<T>().setKuduService(new KuduServiceImpl<>()).build();

  }



  /**

   * An interface used by the KuduIO Write to convert an input record into an Operation to apply as

   * a mutation in Kudu.

   */

  @FunctionalInterface

  public interface FormatFunction<T> extends SerializableFunction<TableAndRecord<T>, Operation> {}



  /** Implementation of {@link KuduIO#read()}. */

  @AutoValue

  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

    @Nullable

    abstract List<String> getMasterAddresses();



    @Nullable

    abstract String getTable();



    @Nullable

    abstract Integer getBatchSize();



    @Nullable

    abstract List<String> getProjectedColumns();



    @Nullable

    abstract List<Common.ColumnPredicatePB> getSerializablePredicates();



    @Nullable

    abstract Boolean getFaultTolerent();



    @Nullable

    abstract SerializableFunction<RowResult, T> getParseFn();



    @Nullable

    abstract Coder<T> getCoder();



    @Nullable

    abstract KuduService<T> getKuduService();



    abstract Builder<T> builder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);



      abstract Builder<T> setTable(String table);



      abstract Builder<T> setBatchSize(Integer batchSize);



      abstract Builder<T> setProjectedColumns(List<String> projectedColumns);



      abstract Builder<T> setSerializablePredicates(

          List<Common.ColumnPredicatePB> serializablePredicates);



      abstract Builder<T> setFaultTolerent(Boolean faultTolerent);



      abstract Builder<T> setParseFn(SerializableFunction<RowResult, T> parseFn);



      abstract Builder<T> setCoder(Coder<T> coder);



      abstract Builder<T> setKuduService(KuduService<T> kuduService);



      abstract Read<T> build();

    }



    @VisibleForTesting

    Coder<T> inferCoder(CoderRegistry coderRegistry) {

      try {

        return getCoder() != null

            ? getCoder()

            : coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));

      } catch (CannotProvideCoderException e) {

        throw new IllegalArgumentException(

            "Unable to infer coder for output of parseFn ("

                + TypeDescriptors.outputOf(getParseFn())

                + "). Specify it explicitly using withCoder().",

            e);

      }

    }



    /** Reads from the Kudu cluster on the specified master addresses. */

    public Read<T> withMasterAddresses(String masterAddresses) {

      checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");

      return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();

    }



    /** Reads from the specified table. */

    public Read<T> withTable(String table) {

      checkArgument(table != null, "table cannot be null");

      return builder().setTable(table).build();

    }



    /** Provides the function to parse a row from Kudu into the typed object. */

    public Read<T> withParseFn(SerializableFunction<RowResult, T> parseFn) {

      checkArgument(parseFn != null, "parseFn cannot be null");

      return builder().setParseFn(parseFn).build();

    }



    /** Filters the rows read from Kudu using the given predicates. */

    public Read<T> withPredicates(List<KuduPredicate> predicates) {

      checkArgument(predicates != null, "predicates cannot be null");

      // reuse the kudu protobuf serialization mechanism

      List<Common.ColumnPredicatePB> serializablePredicates =

          predicates.stream().map(KuduPredicate::toPB).collect(Collectors.toList());

      return builder().setSerializablePredicates(serializablePredicates).build();

    }



    /** Filters the columns read from the table to include only those specified. */

    public Read<T> withProjectedColumns(List<String> projectedColumns) {

      checkArgument(projectedColumns != null, "projectedColumns cannot be null");

      return builder().setProjectedColumns(projectedColumns).build();

    }



    /** Reads from the table in batches of the specified size. */

    public Read<T> withBatchSize(int batchSize) {

      checkArgument(batchSize >= 0, "batchSize must not be negative");

      return builder().setBatchSize(batchSize).build();

    }



    /**

     * Instructs the read scan to resume a scan on another tablet server if the current server fails

     * and faultTolerant is set to true.

     */

    public Read<T> withFaultTolerent(boolean faultTolerent) {

      return builder().setFaultTolerent(faultTolerent).build();

    }



    /**

     * Sets a {@link Coder} for the result of the parse function. This may be required if a coder

     * can not be inferred automatically.

     */

    public Read<T> withCoder(Coder<T> coder) {

      checkArgument(coder != null, "coder cannot be null");

      return builder().setCoder(coder).build();

    }



    /** Specify an instance of {@link KuduService} used to connect and read from Kudu. */

    @VisibleForTesting

    Read<T> withKuduService(KuduService<T> kuduService) {

      checkArgument(kuduService != null, "kuduService cannot be null");

      return builder().setKuduService(kuduService).build();

    }



    @Override

    public PCollection<T> expand(PBegin input) {

      Pipeline p = input.getPipeline();

      final Coder<T> coder = inferCoder(p.getCoderRegistry());

      return input.apply(com.bff.gaia.unified.sdk.io.Read.from(new KuduSource<>(this, coder, null)));

    }



    @Override

    public void validate(PipelineOptions pipelineOptions) {

      checkState(

          getMasterAddresses() != null,

          "KuduIO.read() requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");

      checkState(

          getTable() != null,

          "KuduIO.read() requires a table name to be set via withTableName(tableName)");

      checkState(

          getParseFn() != null,

          "KuduIO.read() requires a parse function to be set via withParseFn(parseFn)");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.add(DisplayData.item("masterAddresses", getMasterAddresses().toString()));

      builder.add(DisplayData.item("table", getTable()));

    }

  }



  static class KuduSource<T> extends BoundedSource {

    final Read<T> spec;

    private final Coder<T> coder;

    @Nullable

	byte[] serializedToken; // only during a split



    KuduSource(Read spec, Coder<T> coder, byte[] serializedToken) {

      this.spec = spec;

      this.coder = coder;

      this.serializedToken = serializedToken;

    }



    // A Kudu source can be split once only providing a source per tablet

    @Override

    public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options)

        throws KuduException {

      if (serializedToken != null) {

        return Collections.singletonList(this); // we are already a split



      } else {

        Stream<BoundedSource<T>> sources =

            spec.getKuduService().createTabletScanners(spec).stream()

                .map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));

        return sources.collect(Collectors.toList());

      }

    }



    @Override

    public long getEstimatedSizeBytes(PipelineOptions options) {

      return 0; // Kudu does not expose tablet sizes

    }



    @Override

    public BoundedReader<T> createReader(PipelineOptions options) {

      return spec.getKuduService().createReader(this);

    }



    @Override

    public Coder<T> getOutputCoder() {

      return coder;

    }

  }



  /**

   * A {@link PTransform} that writes to Kudu. See the class-level Javadoc on {@link KuduIO} for

   * more information.

   *

   * @see KuduIO

   */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {

    @Nullable

    abstract List<String> masterAddresses();



    @Nullable

    abstract String table();



    @Nullable

    abstract FormatFunction<T> formatFn();



    @Nullable

    abstract KuduService<T> kuduService();



    abstract Builder<T> builder();



    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);



      abstract Builder<T> setTable(String table);



      abstract Builder<T> setFormatFn(FormatFunction<T> formatFn);



      abstract Builder<T> setKuduService(KuduService<T> kuduService);



      abstract Write<T> build();

    }



    /** Writes to the Kudu cluster on the specified master addresses. */

    public Write withMasterAddresses(String masterAddresses) {

      checkArgument(masterAddresses != null, "masterAddresses cannot be null or empty");

      return builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();

    }



    /** Writes to the specified table. */

    public Write withTable(String table) {

      checkArgument(table != null, "table cannot be null");

      return builder().setTable(table).build();

    }



    /** Writes using the given function to create the mutation operations from the input. */

    public Write withFormatFn(FormatFunction<T> formatFn) {

      checkArgument(formatFn != null, "formatFn cannot be null");

      return builder().setFormatFn(formatFn).build();

    }



    /** Specify the {@link KuduService} used to connect and write into the Kudu table. */

    @VisibleForTesting

    Write<T> withKuduService(KuduService<T> kuduService) {

      checkArgument(kuduService != null, "kuduService cannot be null");

      return builder().setKuduService(kuduService).build();

    }



    @Override

    public PDone expand(PCollection<T> input) {

      input.apply(ParDo.of(new WriteFn(this)));

      return PDone.in(input.getPipeline());

    }



    @Override

    public void validate(PipelineOptions pipelineOptions) {

      checkState(

          masterAddresses() != null,

          "KuduIO.write() requires a list of master addresses to be set via withMasterAddresses(masterAddresses)");

      checkState(

          table() != null, "KuduIO.write() requires a table name to be set via withTable(table)");

      checkState(

          formatFn() != null,

          "KuduIO.write() requires a format function to be set via withFormatFn(formatFn)");

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.add(DisplayData.item("masterAddresses", masterAddresses().toString()));

      builder.add(DisplayData.item("tableName", table()));

      builder.add(DisplayData.item("formatFn", formatFn().getClass().getCanonicalName()));

    }



    private class WriteFn extends DoFn<T, Void> {

      private final Write<T> spec;

      private KuduService.Writer writer;



      WriteFn(Write<T> spec) {

        this.spec = spec;

      }



      @Setup

      public void setup() throws KuduException {

        writer = spec.kuduService().createWriter(spec);

      }



      @StartBundle

      public void startBundle(StartBundleContext context) throws KuduException {

        writer.openSession();

      }



      @ProcessElement

      public void processElement(ProcessContext c) throws KuduException {

        writer.write(c.element());

      }



      @FinishBundle

      public void finishBundle() throws Exception {

        writer.closeSession();

      }



      @Teardown

      public void teardown() throws Exception {

        writer.close();

        writer = null;

      }



      @Override

      public void populateDisplayData(DisplayData.Builder builder) {

        super.populateDisplayData(builder);

        builder.add(DisplayData.item("masterAddresses", spec.masterAddresses().toString()));

        builder.add(DisplayData.item("table", spec.table()));

      }

    }

  }

}